﻿/**
* CRL
*/
using CRL.Core;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace CRL.RedisProvider
{
    internal class StackExchangeClient : IRedisClient
    {
        //private static readonly string Coonstr = RedisClient.GetRedisConn();
        //private static object _locker = new Object();
        private static System.Collections.Concurrent.ConcurrentDictionary<string, ConnectionMultiplexer> _cache = new System.Collections.Concurrent.ConcurrentDictionary<string, ConnectionMultiplexer>();

        public static ConnectionMultiplexer GetInstance(string name)
        {
            if (string.IsNullOrEmpty(name))
            {
                name = "default__";
            }

            var func = ConfigBuilder.GetConfig<Func<string, string>>("redisConn");
            if (func == null)
            {
                throw new Exception("请实现ConfigBuilder UseRedis");
            }
            var coonstr = func.Invoke(name);
            if (string.IsNullOrEmpty(coonstr))
            {
                throw new Exception("请实现RedisClient.GetRedisConn");
            }
            var a = _cache.TryGetValue(coonstr, out ConnectionMultiplexer instance);
            if (a && instance.IsConnected)
            {
                return instance;
            }
            if (coonstr.Contains("@"))
            {
                var arry = coonstr.Split(',');
                var arry1 = arry[0].Split('@');
                var pass = "";
                var ip = "";
                if (arry1.Length > 1)
                {
                    pass = arry1[0];
                    ip = arry1[1];
                }
                else
                {
                    ip = arry1[0];
                }
                var options = ConfigurationOptions.Parse(ip);
                options.Password = pass;
                options.SyncTimeout = 10000;
                instance = ConnectionMultiplexer.Connect(options);
            }
            else
            {
                instance = ConnectionMultiplexer.Connect(coonstr);
            }
            _cache.TryRemove(coonstr, out var _);
            _cache.TryAdd(coonstr, instance);
            //注册如下事件
            //_instance.ConnectionFailed += MuxerConnectionFailed;
            //_instance.ConnectionRestored += MuxerConnectionRestored;
            //_instance.ErrorMessage += MuxerErrorMessage;
            //_instance.ConfigurationChanged += MuxerConfigurationChanged;
            //_instance.HashSlotMoved += MuxerHashSlotMoved;
            //_instance.InternalError += MuxerInternalError;
            return instance;
        }
        int _db = -1;
        string _name;
        internal StackExchangeClient(string name, int db = -1)
        {
            _db = db;
            _name = name;
        }

        /// <summary>
        /// 
        /// </summary>
        /// <returns></returns>
        public IDatabase GetDatabase()
        {
            return GetInstance(_name).GetDatabase(_db);
        }
        public List<string> SearchKey(string keyPattern, int pageSize = 10)
        {
            var db = _db < 0 ? 0 : _db;
            var result = (string[])GetDatabase().ScriptEvaluate(
               LuaScript.Prepare("local res=redis.call('scan',@db,'match',@key,'count',@size) return res[2]"),
               new { key = keyPattern, size = pageSize, db });
            return result.ToList();
        }

        public T KGet<T>(string key)
        {
            return KGet<T>(key, out var find);
        }
        public T KGet<T>(string key, out bool find)
        {
            find = false;
            var str = KGet(key);
            find = str != null;
            if (find)
            {
                return Deserialize<T>(str);
            }
            return default(T);
        }
        /// <summary>
        /// 获取原始字符串
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        public string KGet(string key)
        {
            return GetDatabase().StringGet(key);
        }

        public Dictionary<string, T> GetAll<T>(IEnumerable<string> keys)
        {
            var dic = new Dictionary<string, T>();
            var batch = GetDatabase().CreateBatch();
            var valueList = new Dictionary<string, Task<RedisValue>>();
            foreach (var key in keys)
            {
                valueList.Add(key, batch.StringGetAsync(key));
            }
            batch.Execute();
            foreach (var kv in valueList)
            {
                var result = kv.Value.Result;
                var value = default(T);
                if (result.HasValue)
                {
                    value = Deserialize<T>(result);
                }
                dic.Add(kv.Key, value);
            }
            return dic;
        }

        /// <summary>
        /// 设置缓存
        /// </summary>
        /// <param name="key"></param>
        /// <param name="value"></param>
        public bool KSet(string key, object value, TimeSpan? expireIn)
        {
            return GetDatabase().StringSet(key, Serialize(value), expireIn);
        }
        public void KSet(Dictionary<string, object> values)
        {
            BatchWrite(values, (batch, kv) =>
            {
                batch.StringSetAsync(kv.Key, Serialize(kv.Value));
            });
        }
        public bool ContainsKey(string key)
        {
            return GetDatabase().KeyExists(key);
        }
        public void KSetEntryIn(string key, TimeSpan expiresTime)
        {
            GetDatabase().KeyExpire(key, expiresTime);
        }
        #region hash

        /// <summary>
        /// 移除指定的记录
        /// </summary>
        /// <returns></returns>
        public bool HRemove(string key, string hashField)
        {
            return GetDatabase().HashDelete(key, hashField);
        }

        /// <summary>
        /// 存储对象到缓存中
        /// </summary>
        /// <param name="value">对象</param>
        public bool HSet(string key, string hashField, object value)
        {
            return GetDatabase().HashSet(key, hashField, Serialize(value));
        }
        public void HSet(string key, Dictionary<string, object> values)
        {
            BatchWrite(values, (batch, kv) =>
            {
                batch.HashSetAsync(key, kv.Key, Serialize(kv.Value));
            });
        }

        public T HGet<T>(string key, string hashField)
        {
            var a = HGet<T>(key, hashField, out var value);
            return value;
        }

        public bool HGet<T>(string key, string hashField, out T value)
        {
            if (string.IsNullOrEmpty(key))
            {
                throw new Exception("key为空");
            }
            if (string.IsNullOrEmpty(hashField))
            {
                throw new Exception("hashField为空");
            }
            value = default(T);
            var result = GetDatabase().HashGet(key, hashField);
            if (!result.HasValue)
            {
                return false;
            }
            value = Deserialize<T>(result);
            return true;
        }
        public Dictionary<string, Dictionary<string, T>> HGetAll<T>(IEnumerable<string> keys)
        {
            var dic = new Dictionary<string, Dictionary<string, T>>();
            var batch = GetDatabase().CreateBatch();
            var valueList = new Dictionary<string, Task<HashEntry[]>>();
            foreach (var key in keys)
            {
                valueList.Add(key, batch.HashGetAllAsync(key));
            }
            batch.Execute();
            foreach (var kv in valueList)
            {
                var list2 = new Dictionary<string, T>();
                foreach (var entry in kv.Value.Result)
                {
                    list2.Add(entry.Name, Deserialize<T>(entry.Value));
                }
                dic.Add(kv.Key, list2);
            }
            return dic;
        }
        public Dictionary<string, T> HGet<T>(string key, string[] hashFields)
        {
            var dic = new Dictionary<string, T>();
            var batch = GetDatabase().CreateBatch();
            var values = batch.HashGetAsync(key, hashFields.Select(b => (RedisValue)b).ToArray());
            batch.Execute();
            var result = values.Result;
            for (var i = 0; i < hashFields.Count(); i++)
            {
                var value = result[i];
                dic.Add(hashFields[i], Deserialize<T>(value));
            }
            return dic;
        }

        public bool HContainsKey(string key, string hashField)
        {
            if (string.IsNullOrEmpty(key))
            {
                throw new Exception("key为空");
            }
            if (string.IsNullOrEmpty(hashField))
            {
                throw new Exception("hashField为空");
            }
            return GetDatabase().HashExists(key, hashField);
        }
        public List<T> HGetAll<T>(string key)
        {
            var result = GetDatabase().HashGetAll(key);
            var list = new List<T>();
            foreach (var item in result)
            {
                var obj = Deserialize<T>(item.Value);
                list.Add(obj);
            }
            return list;
        }
        public Dictionary<string, T> HGetDic<T>(string key)
        {
            var result = GetDatabase().HashGetAll(key);
            return result.ToDictionary(b => b.Name.ToString(), b => Deserialize<T>(b.Value));
        }
        public List<string> HGetAllKeys(string key)
        {
            var allKeys = GetDatabase().HashKeys(key);
            if (allKeys.Length == 0)
            {
                return new List<string>();
            }
            return allKeys.Select(b => b.ToString()).ToList();
        }
        public long GetHashCount(string key)
        {
            return GetDatabase().HashLength(key);
        }

        #endregion

        /// <summary>
        /// 移除指定key的缓存
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        public bool Remove(string key)
        {
            return GetDatabase().KeyDelete(key);
        }

        public long Increment(string key, string field, TimeSpan? timeOut = null, int num = 1)
        {
            key = string.Format("Increment_{0}", key);
            bool exists = false;
            if (timeOut != null)
            {
                exists = ContainsKey(key);
            }
            var result = GetDatabase().HashIncrement(key, field, num);
            if (timeOut != null && !exists)
            {
                //自动失效,清理垃圾数据
                KSetEntryIn(key, timeOut.Value);
            }
            return result;
        }
        /// <summary>
        /// 序列化对象
        /// </summary>
        /// <param name="o"></param>
        /// <returns></returns>
        static string Serialize(object o)
        {
            return StringHelper.SerializerToJson(o);
        }

        /// <summary>
        /// 反序列化对象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="stream"></param>
        /// <returns></returns>
        static T Deserialize<T>(string data)
        {
            if (data == null)
            {
                return default(T);
            }
            return SerializeHelper.DeserializeFromJson<T>(data);
        }

        #region  消息发布
        /// <summary>
        /// 当作消息代理中间件使用
        /// 消息组建中,重要的概念便是生产者,消费者,消息中间件。
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="message"></param>
        /// <returns></returns>
        public void Publish(string channel, params string[] messages)
        {
            ISubscriber sub = GetInstance(_name).GetSubscriber();
            //return sub.Publish("messages", "hello");
            foreach (var message in messages)
            {
                sub.Publish(channel, message);
            }
        }
        public async Task PublishAsync(string channel, params string[] messages)
        {
            ISubscriber sub = GetInstance(_name).GetSubscriber();
            //return sub.Publish("messages", "hello");
            foreach (var message in messages)
            {
                await sub.PublishAsync(channel, message);
            }
        }
        /// <summary>
        /// 在消费者端得到该消息并输出
        /// </summary>
        /// <param name="channelName"></param>
        /// <returns></returns>
        public void Subscribe(string channelName, Action<string, string> callBack)
        {
            var sub = GetInstance(_name).GetSubscriber();
            sub.Subscribe(channelName, (channel, message) =>
            {
                try
                {
                    callBack(message, channel);
                }
                catch (Exception ero)
                {
                    Console.WriteLine($"RedisOnSubscribe {ero.Message}");
                    EventLog.Log(ero.ToString(), "RedisOnSubscribe");
                    throw ero;
                }
            });
        }
        public void UnSubscribe(string channelName)
        {
            var sub = GetInstance(_name).GetSubscriber();
            sub.Unsubscribe(channelName);
        }
        public Task SubscribeAsync(string channelName, Action<string, string> callBack)
        {
            var sub = GetInstance(_name).GetSubscriber();
            var task = sub.SubscribeAsync(channelName, (channel, message) =>
              {
                  try
                  {
                      callBack(message, channel);
                  }
                  catch (Exception ero)
                  {
                      EventLog.Log(ero.ToString(), "RedisOnSubscribe");
                      throw ero;
                  }
              });
            return task;
        }
        #endregion

        //public long ListRightPush<T>(string key, T value)
        //{
        //    return GetDatabase().ListRightPush(key, Serialize(value));
        //    //return GetDatabase().ListRightPush(key, Serialize(value));
        //}
        public long ListRightPush(string key, string value)
        {
            return GetDatabase().ListRightPush(key, value);
        }
        public long ListLeftPush(string key, string value)
        {
            return GetDatabase().ListLeftPush(key, value);
        }
        public long ListRemove(string key, object value)
        {
            return GetDatabase().ListRemove(key, Serialize(value));
        }
        public IEnumerable<string> ListRange(string key, long start, long end)
        {
            return GetDatabase().ListRange(key, start, end).Select(b => b.ToString());
        }
        public void ListTrim(string key, long start, long end)
        {
            GetDatabase().ListTrim(key, start, end);
        }
        public long ListLength(string key)
        {
            return GetDatabase().ListLength(key);
        }

        public bool BatchRemove(string keyPattern)
        {
            var result = GetDatabase().ScriptEvaluate(LuaScript.Prepare(
                //Redis的keys模糊查询：
                " local ks = redis.call('KEYS', @keypattern) " + //local ks为定义一个局部变量，其中用于存储获取到的keys
                " for i=1,#ks,5000 do " +    //#ks为ks集合的个数, 语句的意思： for(int i = 1; i <= ks.Count; i+=5000)
                "     redis.call('del', unpack(ks, i, math.min(i+4999, #ks))) " + //Lua集合索引值从1为起始，unpack为解包，获取ks集合中的数据，每次5000，然后执行删除
                " end " +
                " return true "
                ),
                new { keypattern = keyPattern });
            return true;
        }
        void BatchWrite(Dictionary<string, object> values, Action<IBatch, KeyValuePair<string, object>> func)
        {
            var batch = GetDatabase().CreateBatch();
            foreach (var kv in values)
            {
                func.Invoke(batch, kv);
            }
            batch.Execute();
        }
        #region bit
        public bool GetBit(string key, long offSet)
        {
            return GetDatabase().StringGetBit(key, offSet);
        }
        public void SetBit(string key, long offSet, bool bit)
        {
            GetDatabase().StringSetBit(key, offSet, bit);
        }
        #endregion
        #region geo
        public bool GeoAdd(string key, double longitude, double latitude, string value)
        {
            return GetDatabase().GeoAdd(key, longitude, latitude, value);
        }
        public GeoRadiusResult[] GeoRadius(string key, double longitude, double latitude, double radius)
        {
            return GetDatabase().GeoRadius(key, longitude, latitude, radius);
        }
        public bool GeoRemove(string key, string value)
        {
            return GetDatabase().GeoRemove(key, value);
        }
        #endregion
    }
}
